# LocalStack Resource Provider Scaffolding v2
from __future__ import annotations

import copy
from pathlib import Path
from typing import Optional, TypedDict

import localstack.services.cloudformation.provider_utils as util
from localstack.aws.api.es import CreateElasticsearchDomainRequest
from localstack.services.cloudformation.resource_provider import (
    OperationStatus,
    ProgressEvent,
    ResourceProvider,
    ResourceRequest,
)
from localstack.utils.collections import convert_to_typed_dict


class ElasticsearchDomainProperties(TypedDict):
    AccessPolicies: Optional[dict]
    AdvancedOptions: Optional[dict]
    AdvancedSecurityOptions: Optional[AdvancedSecurityOptionsInput]
    Arn: Optional[str]
    CognitoOptions: Optional[CognitoOptions]
    DomainArn: Optional[str]
    DomainEndpoint: Optional[str]
    DomainEndpointOptions: Optional[DomainEndpointOptions]
    DomainName: Optional[str]
    EBSOptions: Optional[EBSOptions]
    ElasticsearchClusterConfig: Optional[ElasticsearchClusterConfig]
    ElasticsearchVersion: Optional[str]
    EncryptionAtRestOptions: Optional[EncryptionAtRestOptions]
    Id: Optional[str]
    LogPublishingOptions: Optional[dict]
    NodeToNodeEncryptionOptions: Optional[NodeToNodeEncryptionOptions]
    SnapshotOptions: Optional[SnapshotOptions]
    Tags: Optional[list[Tag]]
    VPCOptions: Optional[VPCOptions]


class ZoneAwarenessConfig(TypedDict):
    AvailabilityZoneCount: Optional[int]


class ColdStorageOptions(TypedDict):
    Enabled: Optional[bool]


class ElasticsearchClusterConfig(TypedDict):
    ColdStorageOptions: Optional[ColdStorageOptions]
    DedicatedMasterCount: Optional[int]
    DedicatedMasterEnabled: Optional[bool]
    DedicatedMasterType: Optional[str]
    InstanceCount: Optional[int]
    InstanceType: Optional[str]
    WarmCount: Optional[int]
    WarmEnabled: Optional[bool]
    WarmType: Optional[str]
    ZoneAwarenessConfig: Optional[ZoneAwarenessConfig]
    ZoneAwarenessEnabled: Optional[bool]


class SnapshotOptions(TypedDict):
    AutomatedSnapshotStartHour: Optional[int]


class VPCOptions(TypedDict):
    SecurityGroupIds: Optional[list[str]]
    SubnetIds: Optional[list[str]]


class NodeToNodeEncryptionOptions(TypedDict):
    Enabled: Optional[bool]


class DomainEndpointOptions(TypedDict):
    CustomEndpoint: Optional[str]
    CustomEndpointCertificateArn: Optional[str]
    CustomEndpointEnabled: Optional[bool]
    EnforceHTTPS: Optional[bool]
    TLSSecurityPolicy: Optional[str]


class CognitoOptions(TypedDict):
    Enabled: Optional[bool]
    IdentityPoolId: Optional[str]
    RoleArn: Optional[str]
    UserPoolId: Optional[str]


class MasterUserOptions(TypedDict):
    MasterUserARN: Optional[str]
    MasterUserName: Optional[str]
    MasterUserPassword: Optional[str]


class AdvancedSecurityOptionsInput(TypedDict):
    AnonymousAuthEnabled: Optional[bool]
    Enabled: Optional[bool]
    InternalUserDatabaseEnabled: Optional[bool]
    MasterUserOptions: Optional[MasterUserOptions]


class EBSOptions(TypedDict):
    EBSEnabled: Optional[bool]
    Iops: Optional[int]
    VolumeSize: Optional[int]
    VolumeType: Optional[str]


class EncryptionAtRestOptions(TypedDict):
    Enabled: Optional[bool]
    KmsKeyId: Optional[str]


class Tag(TypedDict):
    Key: Optional[str]
    Value: Optional[str]


REPEATED_INVOCATION = "repeated_invocation"


class ElasticsearchDomainProvider(ResourceProvider[ElasticsearchDomainProperties]):
    TYPE = "AWS::Elasticsearch::Domain"  # Autogenerated. Don't change
    SCHEMA = util.get_schema_path(Path(__file__))  # Autogenerated. Don't change

    def create(
        self,
        request: ResourceRequest[ElasticsearchDomainProperties],
    ) -> ProgressEvent[ElasticsearchDomainProperties]:
        """
        Create a new resource.

        Primary identifier fields:
          - /properties/Id



        Create-only properties:
          - /properties/DomainName

        Read-only properties:
          - /properties/Id
          - /properties/DomainArn
          - /properties/DomainEndpoint
          - /properties/Arn



        """
        model = request.desired_state

        client = request.aws_client_factory.es
        if not request.custom_context.get(REPEATED_INVOCATION):
            # this is the first time this callback is invoked
            request.custom_context[REPEATED_INVOCATION] = True

            # defaults
            domain_name = model.get("DomainName")
            if not domain_name:
                model["DomainName"] = util.generate_default_name(
                    request.stack_name, request.logical_resource_id
                )

            params = copy.deepcopy(model)
            params = convert_to_typed_dict(CreateElasticsearchDomainRequest, params)
            params = util.remove_none_values(params)
            cluster_config = params.get("ElasticsearchClusterConfig")
            if isinstance(cluster_config, dict):
                # set defaults required for boto3 calls
                cluster_config.setdefault("DedicatedMasterType", "m3.medium.elasticsearch")
                cluster_config.setdefault("WarmType", "ultrawarm1.medium.elasticsearch")

            client.create_elasticsearch_domain(**params)

        domain = client.describe_elasticsearch_domain(DomainName=model["DomainName"])
        if domain["DomainStatus"]["Created"]:
            # set data
            model["Arn"] = domain["DomainStatus"]["ARN"]
            model["Id"] = model["DomainName"]
            model["DomainArn"] = domain["DomainStatus"]["ARN"]
            model["DomainEndpoint"] = domain["DomainStatus"].get("Endpoint")

            if tags := model.get("Tags", []):
                client.add_tags(ARN=model["Arn"], TagList=tags)

            return ProgressEvent(status=OperationStatus.SUCCESS, resource_model=model)
        else:
            return ProgressEvent(status=OperationStatus.IN_PROGRESS, resource_model=model)

    def read(
        self,
        request: ResourceRequest[ElasticsearchDomainProperties],
    ) -> ProgressEvent[ElasticsearchDomainProperties]:
        """
        Fetch resource information


        """
        raise NotImplementedError

    def delete(
        self,
        request: ResourceRequest[ElasticsearchDomainProperties],
    ) -> ProgressEvent[ElasticsearchDomainProperties]:
        """
        Delete a resource


        """
        client = request.aws_client_factory.es
        # TODO the delete is currently synchronous;
        #   if this changes, we should also reflect the OperationStatus here
        client.delete_elasticsearch_domain(DomainName=request.previous_state["DomainName"])
        return ProgressEvent(status=OperationStatus.SUCCESS, resource_model={})

    def update(
        self,
        request: ResourceRequest[ElasticsearchDomainProperties],
    ) -> ProgressEvent[ElasticsearchDomainProperties]:
        """
        Update a resource


        """
        raise NotImplementedError
